package cn._51doit.live.udfs;

import cn._51doit.live.pojo.DataBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class JsonToBeanFunctionV2 extends ProcessFunction<Tuple2<String, String>, DataBean> {

    @Override
    public void processElement(Tuple2<String, String> tp, Context ctx, Collector<DataBean> out) throws Exception {
        try {
            String id = tp.f0;
            String line = tp.f1;
            DataBean bean = JSON.parseObject(line, DataBean.class);
            bean.setId(id);
            out.collect(bean);
        } catch (Exception e) {
            //记录错误数据到指定的文件中
        }


    }
}